iT邦幫忙

2024 iThome 鐵人賽

0
Software Development

Django 2024: 從入門到SaaS實戰系列 第 30

Django Channels、Async 和 Celery 的協同之舞: 畫龍點睛之筆 納入Async function

  • 分享至 

  • xImage
  •  

在之前我們已經把核心的功能補齊了,但是為了專案的完整度以及提升使用者體驗

我們會利用非同步的方式來進一步提升專案的完整性

以下是專案的系列文章,推薦第一次閱讀這個系列的讀者可以先看前面的文章了解整個專案架構

Django Channels、Async 和 Celery 的協同之舞: DocuMind專案介紹

Django Channels、Async 和 Celery 的協同之舞: 認識向量資料與Celery

Django Channels、Async 和 Celery 的協同之舞: 打造智能文檔問答系統

Django Channels、Async 和 Celery 的協同之舞: 透過channels建立AI聊天室

今日重點如下:

  • Async in Django
  • 核心功能開發
    • 將consumer由同步修改成非同步
    • 建立chat與聊天記錄持久化
    • 開啟聊天室啟動時載入先前內容
    • 向量資料根據聊天室隔離
    • DEMO

Async in Django

Django自從3.x版後就開始提升非同步的支持,並且是透過python的asgiref套件來實現

隨著版本更新,Django也在持續提升對於非同步的支持,例如5.0版後許多裝飾器也支援非同步

如果是在視圖中想使用非同步方法

以我們consumer中的connect方法為例:

async def connect(self):
    self.room_name = self.scope["url_route"]["kwargs"]["room_name"]
    self.room_group_name = f"chat_{self.room_name}"

    # 加入room group
    await self.channel_layer.group_add(self.room_group_name, self.channel_name)

可以看到寫的方法跟在JavaScript中的寫法不會有太大的區別,就是在函式名稱前面加上async,而內部的非同步方法則加上await。有設置await就一定要在函式加上async

但是在Django中有許多核心功能是有Async unsafe的情形,因為有全局狀態因此不支援協程

因此在Django特別要注意所處的方法中是非同步還是支援同步

  • 同步方法中想調用非同步
from asgiref.sync import sync_to_async, async_to_sync

def connect(self):
    async_to_sync(self.channel_layer.group_add)(
            self.room_group_name,
            self.channel_name
    )

  • 在非同步方法中調用同步
async def async_view(request):
    # 將同步函數轉換為異步
    sync_function_async = sync_to_async(sync_function)
    result = await sync_function_async()
    # 處理 result
  • 如果是有關資料庫的操作,則建議使用database_sync_to_async裝飾器
from asgiref.sync import database_sync_to_async

async def connect(self):
   # 建立或獲取聊天室
   self.chat = await self.get_or_create_chat(self.scope["user"])

@database_sync_to_async
def get_or_create_chat(self, user):
    chat, _ = Chat.objects.get_or_create(room_name=self.room_name, owner=user)
    return chat

Django也開始提升了ORM的非同步操作,不過因為我沒有實際操作過,如果有興趣的可以看官方文檔的補充

https://docs.djangoproject.com/en/4.2/topics/async/#queries-the-orm

核心功能開發

介紹完在Django中怎麼使用非同步進行開發,那接著就來將我們的專案納入Async function

程式碼:https://github.com/class83108/DocuMind/tree/async_chat

將consumer由同步修改成非同步

將consumer轉換成非同步形式

from asgiref.sync import sync_to_async
from channels.generic.websocket import AsyncWebsocketConsumer

class ChatConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        self.room_name = self.scope["url_route"]["kwargs"]["room_name"]
        self.room_group_name = f"chat_{self.room_name}"

        # 加入room group
        await self.channel_layer.group_add(self.room_group_name, self.channel_name)

        await self.accept()

    async def disconnect(self, close_code):
        # 離開room group
        await self.channel_layer.group_discard(self.room_group_name, self.channel_name)

    # Receive message from WebSocket
    async def receive(self, text_data):
        text_data_json = json.loads(text_data)
        message = text_data_json["message"]
        print(f"Received message: {message}", f"{time.time()}")

        # 發送加載中消息到room group
        await self.channel_layer.group_send(
            self.room_group_name,
            {"type": "chat.loading", "message": "正在處理您的請求..."},
        )
        print(f"Sent loading message: {message}", f"{time.time()}")

        # Start async task
        task = await sync_to_async(search_documents_and_answer.delay)(message)

        # Start checking task result
        await self.check_task_result(task.id)

    # Check task result
    async def check_task_result(self, task_id):
        max_attempts = 60  # 最多等待60秒
        attempts = 0
        while attempts < max_attempts:
            task = AsyncResult(task_id)
            if await sync_to_async(task.ready)():
                result = await sync_to_async(lambda: task.result)()
                # 發送消息到room group
                await self.channel_layer.group_send(
                    self.room_group_name,
                    {
                        "type": "chat.message",
                        "message": result["answer"],
                        "query": result["query"],
                        "results": result["results"],
                    },
                )
                break
            else:
                # Task not ready, wait for 1 second before checking again
                await asyncio.sleep(1)
                attempts += 1

        if attempts >= max_attempts:
            # 如果超過最大嘗試次數,則發送錯誤消息到room group
            await self.channel_layer.group_send(
                self.room_group_name,
                {
                    "type": "chat.message",
                    "message": "抱歉,我們無法處理您的請求",
                    "query": "",
                    "results": [],
                },
            )

    # Receive loading message from room group
    async def chat_loading(self, event):
        message = event["message"]
        # Send loading message to WebSocket
        await self.send(text_data=json.dumps({"type": "loading", "message": message}))

    # Receive message from room group
    async def chat_message(self, event):
        message = event["message"]
        query = event.get("query", "")
        results = event.get("results", [])

        # Send message to WebSocket
        await self.send(
            text_data=json.dumps(
                {
                    "type": "message",
                    "message": message,
                    "query": query,
                    "results": results,
                }
            )
        )
  • 改繼承AsyncWebsocketConsumer類別,以支援非同步寫法
  • 方法名稱前面加上async,代表裏面有非同步操作
  • 調用非同步方法時則需要加上await
  • 如果原本是同步方法,要在非同步方法調用,則需要加上sync_to_async
    • 否則還是會讓非同步方法阻塞,失去非同步意義

現在雖然我們的聊天室本身不需要非同步也能順利運作,但是為了應付之後更複雜的需求

轉換成非同步方法還是能夠更好的提升效能

建立chat與聊天記錄持久化

我們嘗試在剛建立webSocket時,就使用非同步的方式建立Chat

在consumer內建立方法,並且使用@database_sync_to_async裝飾器

在Django實現非同步依賴asgiref套件,而在channels的database_sync_to_asyncasgiref.sync.sync_to_async相當,均為讓同步的ORM能在非同步函式中使用

from channels.db import database_sync_to_async

@database_sync_to_async
    def get_or_create_chat(self, user):
        chat, _ = Chat.objects.get_or_create(room_name=self.room_name, owner=user)
        return chat

並且在connect方法中套用

async def connect(self):
        self.room_name = self.scope["url_route"]["kwargs"]["room_name"]
        self.room_group_name = f"chat_{self.room_name}"

        # 加入room group
        await self.channel_layer.group_add(self.room_group_name, self.channel_name)

        await self.accept()

        # 建立或獲取聊天室
        self.chat = await self.get_or_create_chat(self.scope["user"])

此時我們要來儲存聊天記錄,我們在每一次在群組內接收訊息與傳遞消息,但是如果每一次都要調用資料庫儲存會造成較大的壓力。我們可以在每次接收訊息時與傳遞訊息時,將訊息透過Redis儲存,而在斷連聊天室時再一次性將所有聊天記錄存到資料庫中,當然聰明的你也想到這會需要建立一個額外的任務交給Celery處理

  • 建立添加訊息到Redis的非同步方法
@database_sync_to_async
def add_to_chat_history(self, sender, message):
    history_key = f"chat_history_{self.room_name}"
    history = cache.get(history_key) or []
    history.append({"sender": sender, "message": message})
    cache.set(history_key, history, timeout=None)
  • 並且在接收訊息與發送訊息時調用方法
# Receive message from WebSocket
async def receive(self, text_data):
    text_data_json = json.loads(text_data)
    message = text_data_json["message"]

    # 將消息添加到聊天歷史記錄
    await self.add_to_chat_history(
        sender=self.scope["user"].username, message=message
    )
# Receive message from room group
async def chat_message(self, event):
    message = event["message"]
    query = event.get("query", "")
    results = event.get("results", [])

    # Send message to WebSocket
    await self.send(
            text_data=json.dumps(
                {
                    "type": "message",
                    "message": message,
                    "query": query,
                    "results": results,
                }
            )
    )

    # 將消息添加到聊天歷史記錄
    await self.add_to_chat_history(sender="DocuMind", message=message)
  • disconnect方法中透過任務來儲存聊天記錄
async def disconnect(self, close_code):
    await self.channel_layer.group_discard(self.room_group_name, self.channel_name)
        
    # 觸發異步保存任務
    await self.save_chat_history_async()
        
async def save_chat_history_async(self):
    save_chat_history.delay(self.chat.id, self.room_name)
@shared_task
def save_chat_history(chat_id, room_name):
    channel_layer = get_channel_layer()
    history_key = f"chat_history_{room_name}"

    # 從 channel_layer 獲取聊天記錄
    chat_history = async_to_sync(channel_layer.get)(history_key) or []

    try:
        chat = Chat.objects.get(id=chat_id)

        if chat_history:
            # 更新數據庫中的聊天記錄
            chat.history.extend(chat_history)
            chat.save()

            # 清除 channel_layer 中的臨時記錄
            async_to_sync(channel_layer.delete)(history_key)

        return f"Chat history saved for room {room_name}"
    except Chat.DoesNotExist:
        return f"Chat with id {chat_id} does not exist"

我們來看成果

可以看確實聊天記錄都被保存下來了

https://ithelp.ithome.com.tw/upload/images/20241015/20161866HRKgswtrRy.png

https://ithelp.ithome.com.tw/upload/images/20241015/20161866Pb1YJQI5UX.png

開啟聊天室啟動時載入先前內容

我們讓聊天室開啟時載入之前的聊天內容

class ChatConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        ...

        # 加載聊天歷史記錄
        await self.load_chat_history()

   

    @database_sync_to_async
    def get_chat_history(self):
        return list(self.chat.history)

    async def load_chat_history(self):
        history = await self.get_chat_history()
        # 將整個歷史記錄作為一個批次發送
        await self.send(text_data=json.dumps({"type": "history", "messages": history}))

在前端的部分去做區分

this.socket.onmessage = (event) => {
                        const data = JSON.parse(event.data);
                        if (data.type === 'loading') {
                            console.log("Received loading message");
                            this.addMessage(data.message, 'system');
                        } else if (data.type === 'message') {
                            console.log("Received message");
                            this.handleIncomingMessage(data);
                        } else if (data.type === 'history') {
                            this.handleHistoryMessages(data.messages);
                        }
                    };
                    
 handleHistoryMessages(messages) {
                    console.log("Received history messages");
                    console.log(messages);
                    this.messages = messages.map(msg => ({
                        id: Date.now(),
                        text: msg.message,
                        type: msg.sender === 'DocuMind' ? 'bot' : 'user'
                    }));
                    
                },

但是我這邊有遇到bug,雖然我有添加id在陣列中但是還是無解

去掉後是不影響運作因此我也先進行了刪除,之後有時間再進行優化

Alpine Warning: x-for ":key" is undefined or invalid 

現在我們打開聊天室時,就可以看到先前的對話內容了

https://ithelp.ithome.com.tw/upload/images/20241015/201618666MhI4m1skb.png

向量資料根據聊天室隔離

我們現在所有向量資料是全局使用,但是我們可能會想要在一次問答當中只挑選部分的資料當作參考資料,我們需要完成以下步驟:

  • 建立新的模型來儲存PDF檔的資料,而不是建立文章,且該PDF需要綁定聊天室
  • 建立PDF上傳的API處理相關邏輯
  • PDF上傳並且解析後,透過Celery建立向量資料,同時注意元數據讓之後的搜尋能有依據做篩選
  • 做查詢時,需要根據聊天室來做區隔

首先建立PDF模型並且遷移到資料庫

# chat.models.py

class PDFDocument(models.Model):
    chatroom = models.ForeignKey(
        Chat, on_delete=models.CASCADE, related_name="pdf_documents"
    )
    uploaded_at = models.DateTimeField(auto_now_add=True)
    processed = models.BooleanField(default=False)
    text = models.TextField(default="")

    def __str__(self):
        return f"PDF for {self.chatroom.name}"

建立PDF上傳的API,其中store_pdf_vector之後再補上

# api.views.py

class PDFUploadAndSave(views.APIView):
    def post(self, request):

        pdf_file = request.data.get("pdf_file")

        chat_room_name = request.data.get("chat_room_name")

        if not pdf_file:
            return response.Response(
                {"error": "No PDF file provided"}, status=status.HTTP_400_BAD_REQUEST
            )

        try:
            pdf_content = pdf_file.read()
            text = extract_text(io.BytesIO(pdf_content))
            # 清理文本
            text = clean_text(text)

            # 保存PDF文檔
            chat_room = Chat.objects.get(room_name=chat_room_name)
            pdf_document = PDFDocument(chatroom=chat_room, text=text)
            pdf_document.save()

            # 開始處理PDF文檔
            store_pdf_vector.apply_async((chat_room.id, pdf_document.id))

            return response.Response(
                {
                    "success": "file saved successfully",
                    "file_name": f"{pdf_file.name}",
                },
                status=status.HTTP_201_CREATED,
            )

        except Exception as e:
            print(e)
            return response.Response(
                {"error": "Error processing PDF file"},
                status=status.HTTP_400_BAD_REQUEST,
            )
            
 # api.urls.py
 
 urlpatterns = [
    ...
    path(
        "upload-pdf-and-save/", PDFUploadAndSave.as_view(), name="upload-pdf-and-save"
    ),
]

修改前端頁面,使得聊天室能夠上傳PDF,並且渲染出已經上傳的PDF檔案

<div class="file-group">
            <form id="upload-form">
                {% csrf_token %}
                <div class="input-container">
                    <input type="file" id="pdf_file" name="pdf_file" accept="application/pdf"
                        hx-post="{% url 'api:upload-pdf-and-save' %}"
                        hx-trigger="change"
                        hx-swap="beforeend"
                        hx-encoding="multipart/form-data"
                        hx-vals='{"chat_room_name": "{{ room_name }}"}'
                        >
                        
                </div>
            </form>
            <div id="file-container">
                <ul id="file-list" class="file-list"></ul>
            </div>
        </div>
        
 <script>
        document.body.addEventListener('htmx:afterRequest', function(event) {
            if (event.detail.successful) {
                var response = JSON.parse(event.detail.xhr.responseText);
                if (response.success) {
                    var fileItem = document.createElement('li');
                    fileItem.className = 'pdf-file-item';
                    fileItem.innerHTML = `
                        <span class="pdf-icon"><i class="fa-solid fa-file-pdf"></i></span>
                        <span class="file-name">${response.file_name}</span>
                    `;
                    document.getElementById('file-list').appendChild(fileItem);
                    
                    // 清除文件輸入
                    document.getElementById('pdf_file').value = '';
                }
            }
        });
        </script>

我們在chat.tasks.py中建立對應的方法:

  • store_pdf_vector方法:其實跟之前在儲存文章時差不多,主要是元數據的不同
@shared_task
def store_pdf_vector(chat_id: int, pdf_id: int) -> None:
    try:
        pdf_document = PDFDocument.objects.get(id=pdf_id)
        # 獲取全局 Chroma 客戶端
        vectorstore = get_vectorstore()

        # 文本準備
        text = pdf_document.text

        # 文本分割
        text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=500,
            chunk_overlap=100,
            length_function=len,
        )
        chunks = text_splitter.split_text(text)

        # 創建 Document 對象列表
        documents = [
            Document(
                page_content=chunk,
                metadata={
                    "chat_id": chat_id,
                    "chunk_index": i,
                },
            )
            for i, chunk in enumerate(chunks)
        ]
        # 生成唯一 ID
        uuids = [str(uuid.uuid4()) for _ in range(len(documents))]

        # 將文章添加到現有的向量存儲中
        vectorstore.add_documents(documents=documents, ids=uuids)

    except Exception as e:
        print(f"Error processing article: {str(e)}")
        raise e

  • search_documents_and_answer方法:跟之前的方法幾乎沒有差別,唯一不同的是在搜尋範圍中加入聊天室的篩選
@shared_task
def search_documents_and_answer(query: str, chat_id: int, num_results: int = 5) -> dict:
    vectorstore = get_vectorstore()

    # 執行相似性搜索,只搜索特定 chat_id 的文檔
    results = vectorstore.similarity_search_with_score(
        query,
        k=num_results,
        filter={"chat_id": chat_id},  # 添加 filter 參數來限制搜索範圍
    )

    print(f"Found {len(results)} results for chat_id: {chat_id}")

    if len(results) == 0:
        return {"query": query, "answer": "No results found", "results": []}

    # 格式化結果
    formatted_results = []
    context = ""
    for doc, score in results:
        formatted_results.append(
            {"content": doc.page_content, "metadata": doc.metadata, "score": score}
        )
        context += doc.page_content + "\n\n"

    # 初始化語言模型
    llm = OpenAI(temperature=0, openai_api_key=settings.OPENAI_API_KEY)

    # 創建提示模板
    prompt = PromptTemplate(
        input_variables=["context", "query"],
        template="根據以下信息回答問題:\n\n{context}\n\n問題: {query}\n\n答案:",
    )

    # 建立鏈 - 輸入提示,語言模型,輸出解析器
    chain = prompt | llm | StrOutputParser()

    # 調用鏈 - 將上下文和查詢作為輸入取代得答案
    answer = chain.invoke({"context": context, "query": query})

    return {
        "query": query,
        "answer": answer,
        "results": formatted_results,
    }

最後修改consumer中調用的方法,讓我們現在已聊天室為單位進行向量資料的篩選

# from articles.tasks import search_documents_and_answer
from chat.tasks import search_documents_and_answer

DEMO

既然都完成了功能,就來看最後的結果如何~

在還沒有任何檔案上傳前,因為沒有儲存到向量資料庫,所以不會返回結果
https://ithelp.ithome.com.tw/upload/images/20241015/20161866J2pKfyZ9am.png

但是當我們上傳資料後,將資料存入向量資料庫後,就能成功搜尋到相關資料了

https://ithelp.ithome.com.tw/upload/images/20241015/20161866ETh8sFddKw.png

恭喜!透過完成最後的功能,來讓我們的專案更符合現實的應用

今日總結

首先介紹Django在進行非同步的開發中,需要注意的事項與常見使用方式

最後我們透過將Async function納入專案之中,不但提升了使用體驗,也提高了專案的上限

雖然以一個專案來說,還有太多需要補齊的地方了

但是以一個介紹channels、Celery與Async function的小專案來說,我們已經把大部分需要知道的技術點完成,並且做了很好的結合~

至此這個專案告一段落,感謝所有看到這裡的讀者

參考資料

  • Django Async:https://docs.djangoproject.com/en/4.2/topics/async/

上一篇
Django Channels、Async 和 Celery 的協同之舞: 透過channels建立AI聊天室
下一篇
Django SaaS未完待續...
系列文
Django 2024: 從入門到SaaS實戰31
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言